Skip to content

Add worker runs and plan UI#90

Open
schapman1974 wants to merge 1 commit intoadelg003:mainfrom
schapman1974:suggestion/plan-worker-ui
Open

Add worker runs and plan UI#90
schapman1974 wants to merge 1 commit intoadelg003:mainfrom
schapman1974:suggestion/plan-worker-ui

Conversation

@schapman1974
Copy link
Copy Markdown

@schapman1974 schapman1974 commented Apr 10, 2026

Adds local worker execution and a fuller run-history UI for Fletcher.

What changed

  • adds fletcher worker mode so the server can launch step execution as child Fletcher processes
  • persists worker logs and exposes them in the UI
  • introduces plan runs and run-step snapshots so each execution is preserved historically
  • refactors the UI to separate plans from plan runs
  • replaces the old graph rendering with an interactive plan widget
  • adds in-place live step status updates instead of refreshing the whole page
  • adds trigger/rerun support from the plan page
  • adds a richer demo workflow with fan-out/fan-in and validation steps
  • logs worker launches in the main Fletcher app logs

Why

Previously Fletcher mostly tracked orchestration state and did not execute work itself. This change makes Fletcher able to
launch worker processes locally, report progress/logs back to the main API, and present each run clearly in the UI.

UI behavior

  • plan page shows the plan structure and the list of runs
  • run page shows the run-specific plan with live state coloring
  • clicking a step shows logs for that step
  • plan view supports zoom, fit, resize-aware layout, and drag-to-pan
  • visible UI wording now uses Plan instead of DAG

Important notes

  • worker callbacks are now attributed to the correct run using the step run_id
  • dataset state transitions are serialized to avoid split/join race conditions that could leave downstream steps stuck in
    waiting
  • includes DB migrations for execution logs and plan runs

Demo

The sample workflow now demonstrates:

  1. clean data directory
  2. generate CSV
  3. validate/profile in parallel
  4. write parquet
  5. validate parquet

Summary by CodeRabbit

Release Notes

  • New Features
    • Plan execution run tracking with status summaries and history
    • Execution logging to capture step-level output and events during plan execution
    • Interactive DAG visualization displaying plan step workflows and dependencies
    • Plan details page showing execution logs and run history per step
    • Ability to trigger and rerun plans directly from the UI
    • Local worker integration for executing scripts as part of plan execution

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@schapman1974 schapman1974 requested a review from adelg003 as a code owner April 10, 2026 20:10
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 10, 2026

Walkthrough

This PR introduces a distributed worker execution system with plan-run tracking and execution logging. It adds database schema for plan runs and execution logs, implements a CLI worker runner that executes scripts with streaming log capture, adds UI pages to visualize plan runs as DAGs, and integrates config-aware core functions with transaction-based state snapshots.

Changes

Cohort / File(s) Summary
Configuration & CLI Infrastructure
Cargo.toml, package.json, build.rs, src/main.rs
Added clap CLI parsing, reqwest HTTP client, and extended Tokio features. Build script copies Nice-DAG assets. Main now dispatches to worker CLI mode or server mode, with new worker_service and worker_key config fields.
Database Schema & Migrations
migrations/20260410170000_execution_log.*.sql, migrations/20260410190000_plan_runs.*.sql
Added execution_log table for persisting execution events with foreign key to data_product. Added plan_run, plan_run_data_product (with CASCADE deletes), and plan_run_dependency tables to snapshot plan state and DAG structure per execution run.
Core Business Logic
src/core.rs, src/db.rs
Introduced plan-run creation, state snapshotting into plan_run_data_product, and dataset locking. Added config-aware function variants (*_with_config). Implemented worker launch behavior with local process spawning. Added execution logging and run-status aggregation queries.
Error Handling & Models
src/error.rs, src/model.rs
Added error variants for Io, Join, Reqwest, Uuid, and WorkerExit. Introduced model structs ExecutionLog, ExecutionLogParam, PlanRun, PlanRunDataProduct, PlanRunDependency, and RunPlan.
API Endpoints
src/api.rs, src/ui/mod.rs
Updated handlers to use config-aware core functions. Added POST /data_product/:dataset_id/:data_product_id/log endpoint for execution logging. Registered new UI routes for plan run details, graphs, logs, and triggering.
Worker System
src/worker.rs
New CLI-driven worker module that authenticates, executes scripts (bash/python via uv or python3), streams stdout/stderr logs to remote endpoint, and updates execution state (RunningSuccess/Failed).
UI & DAG Visualization
src/ui/page.rs, scripts/plan-dag.js
Replaced GraphViz with Nice-DAG widget. Added handlers for plan run pages, graph/log fragments, and trigger rerun. Frontend module provides pan/zoom controls, node selection events, polling, and responsive resizing for interactive DAG display.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Web Client
    participant Server as API Server
    participant DB as Database
    participant Worker as Worker Process
    participant RemoteLog as Remote Log Endpoint

    Client->>Server: POST /plan/:dataset_id/trigger
    Server->>DB: Begin Transaction
    Server->>DB: Create plan_run (QUEUED)
    Server->>DB: Queue data_products with run_id
    Server->>DB: Commit
    Server->>Worker: Spawn worker process<br/>(FLETCHER_WORKER_KEY)
    Server-->>Client: Redirect to run page
    
    Worker->>Server: POST /authenticate
    Server-->>Worker: Bearer token
    Worker->>Server: PATCH state to RUNNING
    Worker->>Worker: Execute script<br/>(bash/python)
    
    Worker->>RemoteLog: POST log (stdout)
    RemoteLog->>Server: Log entry
    Server->>DB: Insert execution_log
    
    Worker->>RemoteLog: POST log (stderr)
    RemoteLog->>Server: Log entry
    Server->>DB: Insert execution_log
    
    Worker->>Server: PATCH state to SUCCESS/FAILED<br/>(exit code)
    Server->>DB: Update plan_run status
    Server->>DB: Update plan_run_data_product state
    
    Client->>Server: GET /plan/:dataset_id/run/:plan_run_id
    Server->>DB: Fetch plan_run + data_products
    Server-->>Client: HTML with DAG widget
    
    Client->>Server: Poll /plan/.../nodes
    Server->>DB: Fetch plan_run_data_products
    Server-->>Client: JSON nodes + active status
    Client->>Client: Update DAG widget nodes
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 A worker hops to distant fields,
Logs each step with precision yield,
Plan runs leap through DAG's domain,
States snap swift, no loop or strain,
Fletcher's flow now scales with grace! 🌟

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add worker runs and plan UI' directly summarizes the main changes: introducing worker execution capabilities and a new plan/run UI.
Docstring Coverage ✅ Passed Docstring coverage is 88.46% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/error.rs (1)

155-401: 🧹 Nitpick | 🔵 Trivial

Consider adding tests for new error variants.

The existing test suite covers many error variants but lacks tests for the newly added Io, Join, Reqwest, Uuid, and WorkerExit variants. Adding tests would ensure the HTTP status code mappings remain correct.

Would you like me to generate tests for these new error variants?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/error.rs` around lines 155 - 401, Add unit tests for the new Error
variants (Io, Join, Reqwest, Uuid, WorkerExit) mirroring the existing pattern:
for each variant (Error::Io, Error::Join, Error::Reqwest, Error::Uuid,
Error::WorkerExit) construct a representative error instance (e.g.,
std::io::Error for Io, a JoinError for Join, a reqwest error for Reqwest, a
uuid::Error for Uuid, and the WorkerExit variant), call into_poem_error() on the
Error variant, and assert that poem_error.status() equals the expected
poem::http::StatusCode as defined by the into_poem_error mapping; add these
tests to the #[cfg(test)] mod tests alongside the other tests to ensure mappings
remain correct.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@migrations/20260410170000_execution_log.up.sql`:
- Around line 1-10: Add a foreign-key constraint tying execution_log.run_id to
the table that owns step runs/historical snapshots: alter the CREATE TABLE
execution_log to include FOREIGN KEY (run_id, dataset_id, data_product_id)
REFERENCES step_run(run_id, dataset_id, data_product_id) (or the actual run
table name/column names if different) so run_id cannot refer to a non-existent
or mismatched run; ensure the referenced columns on the run table are
unique/primary keys.

In `@migrations/20260410190000_plan_runs.down.sql`:
- Around line 1-4: The DROPs are ordered improperly: explicitly drop the index
plan_run_dataset_created_idx before dropping its parent table plan_run, and make
the down migration idempotent by using IF EXISTS on the DROP TABLE and DROP
INDEX statements for plan_run_dependency, plan_run_data_product,
plan_run_dataset_created_idx, and plan_run so the migration can be re-run
safely.

In `@migrations/20260410190000_plan_runs.up.sql`:
- Around line 15-43: Replace the single-column foreign key references on
plan_run_id with a composite foreign key (plan_run_id, dataset_id) that
references plan_run(plan_run_id, dataset_id) in both plan_run_data_product and
plan_run_dependency so rows cannot pair a plan_run_id with a different
dataset_id; specifically, update the FOREIGN KEY in plan_run_data_product
(currently FOREIGN KEY(plan_run_id) ...) to FOREIGN KEY(plan_run_id, dataset_id)
REFERENCES plan_run(plan_run_id, dataset_id) ON DELETE CASCADE, and do the same
for plan_run_dependency (replace its FOREIGN KEY(plan_run_id) ... with a
composite FK on (plan_run_id, dataset_id) referencing plan_run), leaving primary
keys and other columns unchanged.

In `@scripts/plan-dag.js`:
- Around line 81-97: Extract the hardcoded color arrays in stateColors into
named constants at the top of the file (e.g., COLORS_SUCCESS, COLORS_RUNNING,
COLORS_FAILED, COLORS_QUEUED, COLORS_DEFINED, COLORS_WAITING) and replace the
inline array returns in the stateColors function with those constants; ensure
each constant's name conveys the state and includes the same 4-color array
structure so callers of stateColors (stateColors function) keep identical
behavior while improving readability and maintainability.
- Around line 274-283: The global window event listeners for "mousemove",
"mouseup" (and the "resize" listener mentioned later) are never removed and will
leak when a widget is destroyed; fix by creating named/bound handler functions
stored on the record (e.g., record._onMouseMove, record._onMouseUp,
record._onResize) instead of anonymous functions used in
window.addEventListener, register those handlers with window.addEventListener in
the setup (so they close over dragging/startX/startY as needed), and then call
window.removeEventListener with the same stored handler references inside
destroyRecord to unregister them when the widget is torn down.
- Around line 17-18: The call to record.dag.destory() is a typo; inspect the
`@ebay/nice-dag-core` DAG instance API (or its TypeScript defs) to find the
correct cleanup method (likely destroy, dispose, teardown, or close) and replace
the incorrect destory invocation with the correct method on the DAG instance
(update the call site referencing record.dag.destory to record.dag.destroy or
the library's actual cleanup method name and ensure any required arguments or
await/async usage is handled).

In `@src/core.rs`:
- Around line 358-360: The current code path calls
plan_run_active_by_dataset_select(tx, plan.dataset.id) and returns an existing
active run, which causes explicit reruns to be attached to the old run; change
the logic so that when invoking the rerun flow you pass a reuse_active = false
(or otherwise detect an explicit rerun) and, in that case, do NOT call
plan_run_active_by_dataset_select to return the active run—instead create a
fresh plan_run record for the new execution (or alternatively reject the rerun
if you prefer) so each rerun produces a distinct historical run; update the
caller that starts full-plan reruns to pass reuse_active=false (or add a
parameter) and branch in the function that currently checks
plan_run_active_by_dataset_select(tx, plan.dataset.id) to skip returning the
existing run when reuse_active is false.
- Around line 751-758: You currently spawn a child process via command.spawn()
and immediately drop the Child handle (variable child) after logging, which
leaves exited workers unreaped; ensure the Child is awaited/reaped rather than
dropped — either call child.wait() (or .await if using tokio::process::Child) or
hand the Child to a background reaper task (e.g., tokio::spawn or
std::thread::spawn) that waits on the child and logs its exit status; locate the
command.spawn() call and the child variable in the block with the info! log and
replace the immediate drop with an explicit wait or a spawned reaper that owns
child.

In `@src/db.rs`:
- Around line 391-426: The selector execution_logs_by_dataset_select performs an
unbounded fetch_all() and must be limited; change its signature to accept
pagination parameters (e.g., limit: i64 and an optional cursor like
after_created_at: DateTime<Utc> and after_log_id: LogId or a single cursor
token), modify the SQL to add a keyset WHERE clause such as (log.created_at,
log.log_id) > ($2, $3) and a LIMIT $4 (keeping ORDER BY log.created_at,
log.log_id), and use the bounded query to return at most limit rows; apply the
same pattern to the other log selector mentioned so UI readers cannot force
unbounded in-memory loads.
- Around line 741-774: The INSERT uses ON CONFLICT DO NOTHING which returns no
row, so fetch_one() throws on duplicate inserts; update
plan_run_dependency_insert to call fetch_optional() for the INSERT and if it
returns None, issue a follow-up SELECT to load the existing PlanRunDependency
(filtering by plan_run_id, parent_id, child_id) and return that result;
reference the function plan_run_dependency_insert, the INSERT query that uses ON
CONFLICT (plan_run_id, parent_id, child_id) DO NOTHING, and the
PlanRunDependency mapping so the code returns the existing row when the INSERT
yielded no row.

In `@src/main.rs`:
- Around line 128-129: The current initialization of worker_key uses
dotenvy::var("WORKER_KEY").unwrap_or("abc123".to_string()) which silently
defaults to a known secret; change it to require the env var and fail startup
when missing by replacing the unwrap_or usage for worker_key with a fallible
check (e.g. dotenvy::var("WORKER_KEY").expect("WORKER_KEY must be set") or
propagate the Result so main returns an Err) so that the variable worker_key
cannot be set to a default value and process startup aborts if WORKER_KEY is not
provided.

In `@src/model.rs`:
- Around line 210-227: Replace the free-form String for the stream field with a
dedicated enum to restrict allowed stream values: define a LogStream (or
ExecutionStream) enum with the exact variants the worker emits, derive the
needed traits (Clone, Debug, PartialEq, Enum/GraphQL mapping,
Serialize/Deserialize) and use that enum for ExecutionLog::stream and
ExecutionLogParam::stream (and any related input/output GraphQL types). Update
all places that construct or persist ExecutionLog/ExecutionLogParam to convert
between String and the enum (or propagate the enum directly), add a DB migration
or implement/from_sql and into_sql conversions if the column stays a string, and
update tests/usages to use the enum variants instead of arbitrary strings.

In `@src/ui/page.rs`:
- Around line 379-381: The code currently injects raw dag_nodes_json using
PreEscaped in the script tag (class "fletcher-dag-data"), which allows stored
XSS if persisted fields contain "</script>"; stop using PreEscaped and instead
serialize dag_nodes_json with serde_json::to_string and then escape it before
embedding (e.g., run html-escape on the JSON or at minimum replace occurrences
of "</script>" with "<\\/script>" and escape '<', '>', '&'), or alternatively
render it as a safe text node rather than PreEscaped; update the rendering in
page.rs where PreEscaped(dag_nodes_json.to_string()) is used to output the
escaped/encoded JSON so the script block cannot be broken out of.

In `@src/worker.rs`:
- Around line 331-343: The data-product update currently injects the
worker-local filesystem path via the "link" field (see the JSON payload
constructed in the client.put call for "data_product/{}/update" which references
args.script.display()), which leaks host internals and may be rendered as a
public URL; change the payload so "link" is null (or omit the "link" key)
instead and, if you need to keep the script path for debugging, move it into
internal worker metadata rather than the public data-product update (e.g., store
under an internal worker-only field or a local log), ensuring args.script is no
longer serialized into the outbound JSON.
- Around line 99-100: stream_output() currently awaits post_log() for each line
and uses a reqwest Client with no timeouts; change it to push log lines into an
mpsc channel and spawn a dedicated background task that consumes the channel and
calls post_log() so the read/pipe draining loop never blocks on HTTP; build the
reqwest client with explicit timeouts via Client::builder().connect_timeout(...)
and timeout(...) (or equivalent) to avoid unbounded stalls when creating the
Client in Client::builder().build()? (update: set these timeouts when
constructing the client used by the background sender). Also stop persisting the
local path into the "link" field in update_state() (currently using
args.script.display().to_string()); introduce a new field (e.g., "script_path"
or "local_path") or omit the local filesystem path entirely and ensure "link"
only contains external URLs, updating the state payload construction in
update_state() accordingly so the API tests and semantics are preserved.

---

Outside diff comments:
In `@src/error.rs`:
- Around line 155-401: Add unit tests for the new Error variants (Io, Join,
Reqwest, Uuid, WorkerExit) mirroring the existing pattern: for each variant
(Error::Io, Error::Join, Error::Reqwest, Error::Uuid, Error::WorkerExit)
construct a representative error instance (e.g., std::io::Error for Io, a
JoinError for Join, a reqwest error for Reqwest, a uuid::Error for Uuid, and the
WorkerExit variant), call into_poem_error() on the Error variant, and assert
that poem_error.status() equals the expected poem::http::StatusCode as defined
by the into_poem_error mapping; add these tests to the #[cfg(test)] mod tests
alongside the other tests to ensure mappings remain correct.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 7549d74d-f5fa-4f97-b63b-5496883642ab

📥 Commits

Reviewing files that changed from the base of the PR and between 621002e and e2c3121.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • package-lock.json is excluded by !**/package-lock.json
📒 Files selected for processing (17)
  • Cargo.toml
  • build.rs
  • migrations/20260410170000_execution_log.down.sql
  • migrations/20260410170000_execution_log.up.sql
  • migrations/20260410190000_plan_runs.down.sql
  • migrations/20260410190000_plan_runs.up.sql
  • package.json
  • scripts/plan-dag.js
  • src/api.rs
  • src/core.rs
  • src/db.rs
  • src/error.rs
  • src/main.rs
  • src/model.rs
  • src/ui/mod.rs
  • src/ui/page.rs
  • src/worker.rs

Comment on lines +1 to +10
CREATE TABLE execution_log (
log_id BIGSERIAL PRIMARY KEY,
dataset_id UUID NOT NULL,
data_product_id UUID NOT NULL,
run_id UUID NOT NULL,
stream TEXT NOT NULL,
message TEXT NOT NULL,
created_by TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
FOREIGN KEY(dataset_id, data_product_id) REFERENCES data_product(dataset_id, data_product_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add an integrity constraint for run_id.

run_id is now part of the log identity, but this table only constrains (dataset_id, data_product_id). That means a bad callback can persist logs for a nonexistent or wrong run and the row will still be valid. Please tie execution_log.run_id to the table that owns step runs/historical snapshots so logs cannot be misattributed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@migrations/20260410170000_execution_log.up.sql` around lines 1 - 10, Add a
foreign-key constraint tying execution_log.run_id to the table that owns step
runs/historical snapshots: alter the CREATE TABLE execution_log to include
FOREIGN KEY (run_id, dataset_id, data_product_id) REFERENCES step_run(run_id,
dataset_id, data_product_id) (or the actual run table name/column names if
different) so run_id cannot refer to a non-existent or mismatched run; ensure
the referenced columns on the run table are unique/primary keys.

Comment on lines +1 to +4
DROP TABLE plan_run_dependency;
DROP TABLE plan_run_data_product;
DROP INDEX plan_run_dataset_created_idx;
DROP TABLE plan_run;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix DROP order: index must be dropped before its parent table.

The index plan_run_dataset_created_idx is on the plan_run table (based on the naming convention and the up migration), so it must be dropped before the table. Currently, dropping plan_run first would implicitly drop the index, but the explicit DROP INDEX on line 3 would then fail.

Additionally, consider using IF EXISTS for idempotency (consistent with execution_log.down.sql).

🔧 Proposed fix
-DROP TABLE plan_run_dependency;
-DROP TABLE plan_run_data_product;
-DROP INDEX plan_run_dataset_created_idx;
-DROP TABLE plan_run;
+DROP INDEX IF EXISTS plan_run_dataset_created_idx;
+DROP TABLE IF EXISTS plan_run_dependency;
+DROP TABLE IF EXISTS plan_run_data_product;
+DROP TABLE IF EXISTS plan_run;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DROP TABLE plan_run_dependency;
DROP TABLE plan_run_data_product;
DROP INDEX plan_run_dataset_created_idx;
DROP TABLE plan_run;
DROP INDEX IF EXISTS plan_run_dataset_created_idx;
DROP TABLE IF EXISTS plan_run_dependency;
DROP TABLE IF EXISTS plan_run_data_product;
DROP TABLE IF EXISTS plan_run;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@migrations/20260410190000_plan_runs.down.sql` around lines 1 - 4, The DROPs
are ordered improperly: explicitly drop the index plan_run_dataset_created_idx
before dropping its parent table plan_run, and make the down migration
idempotent by using IF EXISTS on the DROP TABLE and DROP INDEX statements for
plan_run_dependency, plan_run_data_product, plan_run_dataset_created_idx, and
plan_run so the migration can be re-run safely.

Comment on lines +15 to +43
CREATE TABLE plan_run_data_product (
plan_run_id UUID NOT NULL,
dataset_id UUID NOT NULL,
data_product_id UUID NOT NULL,
compute compute NOT NULL,
name TEXT NOT NULL,
version TEXT NOT NULL,
eager BOOL NOT NULL,
state state NOT NULL,
step_run_id UUID,
link TEXT,
passback JSONB,
modified_by TEXT NOT NULL,
modified_date TIMESTAMPTZ NOT NULL,
PRIMARY KEY(plan_run_id, data_product_id),
FOREIGN KEY(plan_run_id) REFERENCES plan_run(plan_run_id) ON DELETE CASCADE
);

CREATE INDEX plan_run_data_product_step_run_idx
ON plan_run_data_product(step_run_id);

CREATE TABLE plan_run_dependency (
plan_run_id UUID NOT NULL,
dataset_id UUID NOT NULL,
parent_id UUID NOT NULL,
child_id UUID NOT NULL,
PRIMARY KEY(plan_run_id, parent_id, child_id),
FOREIGN KEY(plan_run_id) REFERENCES plan_run(plan_run_id) ON DELETE CASCADE
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Enforce that child dataset_id values match the parent plan_run.

Both snapshot tables persist dataset_id, but the FK only references plan_run_id. That means a bad insert can write (plan_run_id = run_A, dataset_id = dataset_B) and still satisfy the schema, while your read paths filter on both columns and will silently miss those rows.

Constraint fix
 CREATE TABLE plan_run (
   plan_run_id UUID PRIMARY KEY,
   dataset_id UUID NOT NULL,
   status state NOT NULL,
   created_by TEXT NOT NULL,
   created_at TIMESTAMPTZ NOT NULL,
   modified_by TEXT NOT NULL,
   modified_date TIMESTAMPTZ NOT NULL,
-  FOREIGN KEY(dataset_id) REFERENCES dataset(dataset_id)
+  FOREIGN KEY(dataset_id) REFERENCES dataset(dataset_id),
+  UNIQUE(plan_run_id, dataset_id)
 );
@@
 CREATE TABLE plan_run_data_product (
@@
-  FOREIGN KEY(plan_run_id) REFERENCES plan_run(plan_run_id) ON DELETE CASCADE
+  FOREIGN KEY(plan_run_id, dataset_id)
+    REFERENCES plan_run(plan_run_id, dataset_id)
+    ON DELETE CASCADE
 );
@@
 CREATE TABLE plan_run_dependency (
@@
-  FOREIGN KEY(plan_run_id) REFERENCES plan_run(plan_run_id) ON DELETE CASCADE
+  FOREIGN KEY(plan_run_id, dataset_id)
+    REFERENCES plan_run(plan_run_id, dataset_id)
+    ON DELETE CASCADE
 );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@migrations/20260410190000_plan_runs.up.sql` around lines 15 - 43, Replace the
single-column foreign key references on plan_run_id with a composite foreign key
(plan_run_id, dataset_id) that references plan_run(plan_run_id, dataset_id) in
both plan_run_data_product and plan_run_dependency so rows cannot pair a
plan_run_id with a different dataset_id; specifically, update the FOREIGN KEY in
plan_run_data_product (currently FOREIGN KEY(plan_run_id) ...) to FOREIGN
KEY(plan_run_id, dataset_id) REFERENCES plan_run(plan_run_id, dataset_id) ON
DELETE CASCADE, and do the same for plan_run_dependency (replace its FOREIGN
KEY(plan_run_id) ... with a composite FK on (plan_run_id, dataset_id)
referencing plan_run), leaving primary keys and other columns unchanged.

Comment on lines +17 to +18
if (record.dag && typeof record.dag.destory === "function") {
record.dag.destory();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the nice-dag-core library for the correct method name
rg -n "destory|destroy" node_modules/@ebay/nice-dag-core/lib/index.umd.cjs 2>/dev/null | head -20

# Also check the TypeScript definitions if available
fd -e d.ts . node_modules/@ebay/nice-dag-core --exec cat {} 2>/dev/null | rg -n "destory|destroy"

Repository: adelg003/fletcher

Length of output: 43


🏁 Script executed:

# Check package.json for nice-dag-core dependency
cat package.json | grep -A 5 -B 5 nice-dag

# List what's in node_modules for nice-dag
ls -la node_modules/ | grep -i nice-dag

# Alternative: search for package.json files with nice-dag
fd package.json node_modules | xargs grep -l "nice-dag" 2>/dev/null | head -5

Repository: adelg003/fletcher

Length of output: 508


🏁 Script executed:

# View the actual code in scripts/plan-dag.js around lines 17-18
sed -n '10,25p' scripts/plan-dag.js

# Search for any other usages of .destory or .destroy in the file
rg -n "\.destory|\.destroy" scripts/plan-dag.js

Repository: adelg003/fletcher

Length of output: 565


🏁 Script executed:

# Search for nice-dag usage patterns in the repository
rg -n "nice-dag|\.dag\." scripts/plan-dag.js | head -20

Repository: adelg003/fletcher

Length of output: 288


🏁 Script executed:

# Search for TypeScript definition files
fd -e d.ts . --type f | head -20

# Also check for any import statements that might reveal type definitions
rg -n "import.*nice-dag|from.*nice-dag" scripts/plan-dag.js

Repository: adelg003/fletcher

Length of output: 43


🌐 Web query:

@ebay nice-dag-core destroy method API documentation

💡 Result:

@ebay/nice-dag-core is the core library from eBay's Nice-DAG project for creating and managing DOM structures for visualizing Directed Acyclic Graph (DAG) diagrams. It uses Dagre for layout calculations and provides JS APIs for rendering and interactions like dragging, dropping, creating/removing nodes and edges. It is framework-agnostic but has adaptors for React and Vue3. No explicit documentation for a 'destroy' method on the NiceDag or WritableNiceDag API was found in official docs at opensource.ebay.com/nice-dag/docs or npm pages. The available API refs cover configs (NiceDagConfig), React hook (useNiceDag with 'reset' method), and node removal (node.remove). TypeScript defs include WritableNiceDag.d.ts, but contents not surfaced. The GitHub repo (github.com/eBay/nice-dag) links to examples and docs, but no direct 'destroy' method API doc. A destroy/dispose method may exist for cleanup (common in DOM libs), likely on the NiceDag instance returned by core init functions. Check source code in repo/packages/nice-dag-core/lib/WritableNiceDag.d.ts or JS for details. Install via npm i @ebay/nice-dag-core and inspect types/runtime for exact usage. Official docs: https://opensource.ebay.com/nice-dag/docs/ GitHub: https://github.com/eBay/nice-dag NPM: https://www.npmjs.com/package/@ebay/nice-dag-core

Citations:


🌐 Web query:

site:github.com eBay nice-dag WritableNiceDag destroy method typescript definitions

💡 Result:

No public TypeScript definitions or documentation for a 'WritableNiceDag' interface or its 'destroy' method were found in eBay's nice-dag repository or related sources. The nice-dag library from eBay (https://github.com/eBay/nice-dag) is a TypeScript-based DAG visualization tool with packages like @ebay/nice-dag-core, supporting read-only and editable views, but no explicit mention of 'WritableNiceDag' or a 'destroy' method in available repo pages, README, or issues. It may be an internal interface not exposed publicly, or the term could refer to editable DAG instances without a named 'WritableNiceDag' type. Check the source code directly in the repo's src directories or NPM package types for @ebay/nice-dag-core.

Citations:


🌐 Web query:

site:npmjs.com @ebay/nice-dag-core 1.0.41 TypeScript destroy

💡 Result:

@ebay/nice-dag-core is a TypeScript library from eBay for creating and maintaining DOM structures to visualize Directed Acyclic Graph (DAG) diagrams and support flow editor interactions like dragging, dropping, adding/removing nodes and edges. It uses Dagre for node and edge positioning calculations. The package is framework-agnostic but integrates with wrappers like @ebay/nice-dag-vue3 for Vue 3 and nice-dag-react for React. No public documentation or source code details a 'destroy' method or function in version 1.0.41 (or latest listed 1.0.34). The npm page lists 35 versions, so 1.0.41 likely exists, but latest published is ~1 year ago (2024-03-29). Repository: github.com/eBay/nice-dag (no specific API details found). Detailed docs referred as 'Doc Portal' not located in searches. Check the GitHub repo or installed package types/source for 'destroy' usage, likely a cleanup method for DAG instances given DOM/flow nature.

Citations:


🌐 Web query:

github.com/eBay/nice-dag src packages nice-dag-core WritableNiceDag destory destroy method source code

💡 Result:

The repository github.com/eBay/nice-dag contains the nice-dag-core package in packages/nice-dag-core/src, which defines WritableNiceDag (a TypeScript class or interface for editable DAG views, size ~2.1KB in lib/WritableNiceDag.d.ts). No explicit source code snippet for a 'destroy' or 'destory' (likely typo for destroy) method in WritableNiceDag was found in search results. However, the library supports node removal via node.remove on individual ViewNode objects in editable mode, as shown in official docs. There is no evidence of a WritableNiceDag-level destroy method for the entire DAG instance; it may not exist or could be named differently (e.g., dispose). Full source is available at https://github.com/eBay/nice-dag/tree/main/packages/nice-dag-core/src. Official docs: https://opensource.ebay.com/nice-dag/docs/.

Citations:


🏁 Script executed:

# Search the fletcher repo for any other nice-dag usage or comments
rg -i "nice-dag|destory|destroy" . --type js --type ts -B 2 -A 2

Repository: adelg003/fletcher

Length of output: 820


Fix the destory method call—likely a typo for destroy or another cleanup method.

The code calls record.dag.destory() (lines 17–18), but this method is not documented in the official @ebay/nice-dag-core API. The spelling destory appears to be a typo; standard cleanup methods are typically named destroy. Check the library's TypeScript definitions or source code to confirm the correct cleanup method name for the DAG instance, then update the code accordingly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/plan-dag.js` around lines 17 - 18, The call to record.dag.destory()
is a typo; inspect the `@ebay/nice-dag-core` DAG instance API (or its TypeScript
defs) to find the correct cleanup method (likely destroy, dispose, teardown, or
close) and replace the incorrect destory invocation with the correct method on
the DAG instance (update the call site referencing record.dag.destory to
record.dag.destroy or the library's actual cleanup method name and ensure any
required arguments or await/async usage is handled).

Comment on lines +81 to +97
function stateColors(state) {
switch (state) {
case "success":
return ["#22c55e", "rgba(34, 197, 94, 0.12)", "#166534", "#dcfce7"];
case "running":
return ["#38bdf8", "rgba(56, 189, 248, 0.12)", "#075985", "#e0f2fe"];
case "failed":
return ["#ef4444", "rgba(239, 68, 68, 0.12)", "#991b1b", "#fee2e2"];
case "queued":
return ["#6b7280", "rgba(107, 114, 128, 0.12)", "#374151", "#f3f4f6"];
case "defined":
return ["#d97706", "rgba(245, 158, 11, 0.12)", "#92400e", "#fef3c7"];
case "waiting":
default:
return ["#9ca3af", "rgba(156, 163, 175, 0.12)", "#4b5563", "#f3f4f6"];
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider extracting color constants for maintainability.

The hardcoded color values in stateColors could be extracted to named constants at the top of the file, making it easier to update the color palette and improving readability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/plan-dag.js` around lines 81 - 97, Extract the hardcoded color arrays
in stateColors into named constants at the top of the file (e.g.,
COLORS_SUCCESS, COLORS_RUNNING, COLORS_FAILED, COLORS_QUEUED, COLORS_DEFINED,
COLORS_WAITING) and replace the inline array returns in the stateColors function
with those constants; ensure each constant's name conveys the state and includes
the same 4-color array structure so callers of stateColors (stateColors
function) keep identical behavior while improving readability and
maintainability.

Comment on lines +128 to +129
worker_service: dotenvy::var("WORKER_SERVICE").unwrap_or("local".to_string()),
worker_key: dotenvy::var("WORKER_KEY").unwrap_or("abc123".to_string()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fail closed on missing WORKER_KEY.

Defaulting the worker credential to "abc123" makes the new worker auth path insecure by default. Since this value is propagated into spawned workers and used to mint API access, a missing env var should fail startup instead of silently enabling a well-known secret.

🔐 Suggested fix
-        worker_service: dotenvy::var("WORKER_SERVICE").unwrap_or("local".to_string()),
-        worker_key: dotenvy::var("WORKER_KEY").unwrap_or("abc123".to_string()),
+        worker_service: dotenvy::var("WORKER_SERVICE").unwrap_or_else(|_| "local".to_string()),
+        worker_key: dotenvy::var("WORKER_KEY")?,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 128 - 129, The current initialization of worker_key
uses dotenvy::var("WORKER_KEY").unwrap_or("abc123".to_string()) which silently
defaults to a known secret; change it to require the env var and fail startup
when missing by replacing the unwrap_or usage for worker_key with a fallible
check (e.g. dotenvy::var("WORKER_KEY").expect("WORKER_KEY must be set") or
propagate the Result so main returns an Err) so that the variable worker_key
cannot be set to a default value and process startup aborts if WORKER_KEY is not
provided.

Comment on lines +210 to +227
/// Persisted worker log entry for a data product run
#[derive(Clone, Debug, Object, PartialEq)]
pub struct ExecutionLog {
pub id: i64,
pub data_product_id: DataProductId,
pub run_id: Uuid,
pub stream: String,
pub message: String,
pub created_by: String,
pub created_at: DateTime<Utc>,
}

/// Input for appending worker logs
#[derive(Clone, Object)]
pub struct ExecutionLogParam {
pub run_id: Uuid,
pub stream: String,
pub message: String,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Model stream as an enum, not a raw string.

The worker only emits a fixed set of stream values, but the public contract accepts anything here. Using an enum for ExecutionLog.stream/ExecutionLogParam.stream would tighten the API, prevent garbage labels from being persisted, and keep the UI/log queries aligned with the actual producer set.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/model.rs` around lines 210 - 227, Replace the free-form String for the
stream field with a dedicated enum to restrict allowed stream values: define a
LogStream (or ExecutionStream) enum with the exact variants the worker emits,
derive the needed traits (Clone, Debug, PartialEq, Enum/GraphQL mapping,
Serialize/Deserialize) and use that enum for ExecutionLog::stream and
ExecutionLogParam::stream (and any related input/output GraphQL types). Update
all places that construct or persist ExecutionLog/ExecutionLogParam to convert
between String and the enum (or propagate the enum directly), add a DB migration
or implement/from_sql and into_sql conversions if the column stays a string, and
update tests/usages to use the enum variants instead of arbitrary strings.

Comment on lines +379 to +381
script type="application/json" class="fletcher-dag-data" {
(PreEscaped(dag_nodes_json.to_string()))
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Stop embedding raw DAG JSON with PreEscaped inside the <script> tag.

dag_nodes_json includes persisted plan/step fields like name and version. If one of those contains </script>, this breaks out of the JSON script block and becomes stored XSS on the plan/run pages.

Safer embedding
+fn json_for_script_tag(json: &str) -> String {
+    json.replace("</", "<\\/")
+        .replace('\u{2028}', "\\u2028")
+        .replace('\u{2029}', "\\u2029")
+}
+
 fn render_dag_widget(
     widget_id: &str,
     dag_nodes_json: &str,
     selection_context: Option<(DatasetId, Uuid, Option<DataProductId>)>,
 ) -> Markup {
@@
             div
                 class="fletcher-dag-canvas overflow-auto rounded-xl bg-base-100 p-4"
                 style="height: min(70vh, 720px);" {}
             script type="application/json" class="fletcher-dag-data" {
-                (PreEscaped(dag_nodes_json.to_string()))
+                (PreEscaped(json_for_script_tag(dag_nodes_json)))
             }
         }
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
script type="application/json" class="fletcher-dag-data" {
(PreEscaped(dag_nodes_json.to_string()))
}
fn json_for_script_tag(json: &str) -> String {
json.replace("</", "<\\/")
.replace('\u{2028}', "\\u2028")
.replace('\u{2029}', "\\u2029")
}
fn render_dag_widget(
widget_id: &str,
dag_nodes_json: &str,
selection_context: Option<(DatasetId, Uuid, Option<DataProductId>)>,
) -> Markup {
// ... (previous function body)
script type="application/json" class="fletcher-dag-data" {
(PreEscaped(json_for_script_tag(dag_nodes_json)))
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ui/page.rs` around lines 379 - 381, The code currently injects raw
dag_nodes_json using PreEscaped in the script tag (class "fletcher-dag-data"),
which allows stored XSS if persisted fields contain "</script>"; stop using
PreEscaped and instead serialize dag_nodes_json with serde_json::to_string and
then escape it before embedding (e.g., run html-escape on the JSON or at minimum
replace occurrences of "</script>" with "<\\/script>" and escape '<', '>', '&'),
or alternatively render it as a safe text node rather than PreEscaped; update
the rendering in page.rs where PreEscaped(dag_nodes_json.to_string()) is used to
output the escaped/encoded JSON so the script block cannot be broken out of.

Comment on lines +99 to +100
let started_at = Instant::now();
let client = Client::builder().build()?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
sed -n '98,118p' src/worker.rs
sed -n '285,299p' src/worker.rs
rg -n 'Client::builder\(\)\.build|post_log\(&client, &args, &token, stream, &line\)\.await' src/worker.rs

Repository: adelg003/fletcher

Length of output: 1143


🏁 Script executed:

# Find the post_log function definition
rg -A 10 'async fn post_log' src/

Repository: adelg003/fletcher

Length of output: 431


🏁 Script executed:

# Look for any timeout configurations
rg -n 'timeout|Timeout' src/worker.rs

Repository: adelg003/fletcher

Length of output: 43


🏁 Script executed:

# Check if Client is configured elsewhere with timeout
rg -n 'Client::builder' src/

Repository: adelg003/fletcher

Length of output: 123


🏁 Script executed:

# Look at update_state to understand the link field concern from original scratchpad
rg -B 5 -A 15 'fn update_state' src/worker.rs

Repository: adelg003/fletcher

Length of output: 543


🏁 Script executed:

# Check Cargo.toml for reqwest version and default timeout behavior
cat Cargo.toml | grep -A 5 reqwest

Repository: adelg003/fletcher

Length of output: 374


🏁 Script executed:

# Get the complete post_log function
sed -n '235,270p' src/worker.rs

Repository: adelg003/fletcher

Length of output: 1286


🏁 Script executed:

# Get the complete update_state function with its json body
rg -A 25 'async fn update_state' src/worker.rs

Repository: adelg003/fletcher

Length of output: 717


🏁 Script executed:

# Search for any "link" field assignments that might relate to path disclosure
rg -n '"link"' src/

Repository: adelg003/fletcher

Length of output: 1045


🌐 Web query:

reqwest 0.12.15 default timeout configuration

💡 Result:

In reqwest 0.12.15, the default timeout configuration for the async Client (created via reqwest::Client::new or ClientBuilder::new.build) has no timeout (None). This applies to the overall request timeout (ClientBuilder::timeout), connect timeout (ClientBuilder::connect_timeout), and read timeout (ClientBuilder::read_timeout). Users must explicitly set timeouts if needed, e.g., .timeout(Duration::from_secs(30)). For the blocking client (reqwest::blocking::Client), the default is 30 seconds, but the query likely refers to the primary async client. No changes to this default in 0.12 series per changelog up to 0.12.15.

Citations:


Don't let log uploads block pipe draining, and fix the "link" field overloading.

The stream_output() function awaits each post_log() HTTP request before reading the next line, blocking pipe draining. The Client is built with no timeout configuration, and reqwest 0.12's default async client has no timeout, making this stall unbounded. Under a slow or wedged API, the worker stops draining the pipe, the child can block on write, and the run hangs indefinitely.

Additionally, the update_state() function persists the local script path into the "link" field via args.script.display().to_string(). The API tests expect "link" to contain external URLs (e.g., https://example.com/run-123), not local filesystem paths, leaking server internals and overloading the field semantics.

Please: (1) decouple log shipping from the read loop or buffer writes, and set explicit connect/request timeouts on the client, and (2) use a separate field or move the script path elsewhere rather than misusing "link".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/worker.rs` around lines 99 - 100, stream_output() currently awaits
post_log() for each line and uses a reqwest Client with no timeouts; change it
to push log lines into an mpsc channel and spawn a dedicated background task
that consumes the channel and calls post_log() so the read/pipe draining loop
never blocks on HTTP; build the reqwest client with explicit timeouts via
Client::builder().connect_timeout(...) and timeout(...) (or equivalent) to avoid
unbounded stalls when creating the Client in Client::builder().build()? (update:
set these timeouts when constructing the client used by the background sender).
Also stop persisting the local path into the "link" field in update_state()
(currently using args.script.display().to_string()); introduce a new field
(e.g., "script_path" or "local_path") or omit the local filesystem path entirely
and ensure "link" only contains external URLs, updating the state payload
construction in update_state() accordingly so the API tests and semantics are
preserved.

Comment on lines +331 to +343
client
.put(api_url(
&args.remote,
&format!("data_product/{}/update", args.dataset_id),
))
.bearer_auth(token)
.json(&vec![json!({
"id": args.data_product_id.to_string(),
"state": state.to_string(),
"run_id": args.run_id.to_string(),
"link": args.script.display().to_string(),
"passback": passback,
})])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't expose the local script path through link.

This stores a server-local filesystem path in a field that is returned by the plan/data-product APIs and has otherwise behaved like a job URL. That leaks host internals to callers and can break any UI that renders link as a hyperlink. Keep link null here or move the script path into internal worker metadata instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/worker.rs` around lines 331 - 343, The data-product update currently
injects the worker-local filesystem path via the "link" field (see the JSON
payload constructed in the client.put call for "data_product/{}/update" which
references args.script.display()), which leaks host internals and may be
rendered as a public URL; change the payload so "link" is null (or omit the
"link" key) instead and, if you need to keep the script path for debugging, move
it into internal worker metadata rather than the public data-product update
(e.g., store under an internal worker-only field or a local log), ensuring
args.script is no longer serialized into the outbound JSON.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant